package cn.doitedu.etl;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @Author: deep as the sea
 * @Site: <a href="www.51doit.com">多易教育</a>
 * @QQ: 657270652
 * @Date: 2022/12/11
 * @Tips: 学大数据，到多易教育
 * @Desc:
 *   广告曝光点击事件流内关联
 **/
public class E07_EtlJob_AdEventsCEP {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("file:/d:/checkpoint");
        env.setParallelism(1);

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // 创建kafka连接器 source表： 事件公共维度打宽表
        tEnv.executeSql(
                " CREATE TABLE mall_events_commondim_kfksource(          "
                        + "     user_id           INT,                            "
                        + "     username          string,                         "
                        + "     session_id        string,                         "
                        + "     event_id          string,                         "
                        + "     event_time        bigint,                         "
                        + "     lat               double,                         "
                        + "     lng               double,                         "
                        + "     release_channel   string,                         "
                        + "     device_type       string,                         "
                        + "     properties        map<string,string>,             "
                        + "     register_phone    STRING,                         "
                        + "     user_status       INT,                            "
                        + "     register_time     TIMESTAMP(3),                   "
                        + "     register_gender   INT,                            "
                        + "     register_birthday DATE, register_province STRING, "
                        + "     register_city STRING, register_job STRING, register_source_type INT,   "
                        + "     gps_province   STRING, gps_city STRING, gps_region STRING,             "
                        + "     page_type   STRING, page_service STRING,         "
                        + "     rt as to_timestamp_ltz(event_time,3) ,           "
                        + "     watermark for  rt as rt - interval '0' seconds   "  // 水位线watermark声明，因为后续需要进行时间窗口统计
                        + " ) WITH (                                             "
                        + "  'connector' = 'kafka',                              "
                        + "  'topic' = 'mall-evts-comdim-w',                     "
                        + "  'properties.bootstrap.servers' = 'doitedu:9092',    "
                        + "  'properties.group.id' = 'testGroup',                "
                        + "  'scan.startup.mode' = 'latest-offset',              "
                        + "  'value.format'='json',                              "
                        + "  'value.json.fail-on-missing-field'='false',         "
                        + "  'value.fields-include' = 'EXCEPT_KEY')              ");

        // 过滤出广告曝光、点击事件，并关联广告相关的各类维表，将广告事件相关属性打平
        tEnv.executeSql(
                " CREATE TEMPORARY VIEW ade_view AS                   "
                        +" select                                               "
                        +"     user_id,                                         "
                        +"     release_channel,                                 "
                        +"     device_type,                                     "
                        +"     session_id,                                      "
                        +"     event_id,                                        "
                        +"     event_time,                                      "
                        +"     page_type,                                       "
                        +"     properties['url'] as page_url,                   "
                        +"     properties['ad_id'] as ad_id,                    "
                        +"     properties['ad_region_type'] as ad_region_type,  "
                        +"     properties['ad_tracking_id'] as ad_tracking_id,  "
                        +"     rt                                               "
                        +" from mall_events_commondim_kfksource                 "
                        +" where event_id = 'ad_show' or event_id = 'ad_click'  "
        );

        //tEnv.executeSql("select * from ade_view").print();

        // 对过滤后的广告曝光、点击事件做CEP模式识别
        tEnv.executeSql(
                " CREATE TEMPORARY VIEW  tracking_view AS                               "
                        +" SELECT                                                                 "
                        +"    uid as user_id,                                                     "
                        +"    ch as release_channel,                                              "
                        +"    dv as device_type,                                                  "
                        +"    url as page_url,                                                    "
                        +"    pt as page_type,                                                    "
                        +"    adid as ad_id,                                                      "
                        +"    tkid as ad_tracking_id,                                             "
                        +"    show_time,                                                          "
                        +"    click_time                                                          "
                        +" FROM ade_view                                                          "
                        +"   MATCH_RECOGNIZE(                                                     "
                        +"   PARTITION BY user_id                                                 "
                        +"   ORDER BY rt                                                          "
                        +"   MEASURES                                                             "
                        +"     A.user_id AS uid,                                                  "
                        +"     A.release_channel AS ch,                                           "
                        +"     A.device_type AS dv,                                               "
                        +"     A.page_url AS url,                                                 "
                        +"     A.ad_region_type AS adrt,                                          "
                        +"     A.page_type AS pt,                                                 "
                        +"     A.ad_id AS adid,                                                   "
                        +"     A.ad_tracking_id AS tkid,                                          "
                        +"     A.event_time AS show_time,                                         "
                        +"     C.event_time as click_time                                         "
                        +"   ONE ROW PER MATCH                                                    "
                        +"   AFTER MATCH SKIP TO NEXT ROW                                         "
                        +"   PATTERN(A B*? C)  WITHIN INTERVAL '5' MINUTE                         "
                        +"   DEFINE                                                               "
                        +"      A  AS   A.event_id = 'ad_show'  ,                                 "
                        +"      B  AS   NOT B.ad_tracking_id = A.ad_tracking_id ,                 "
                        +"      C  AS   C.event_id = 'ad_click' AND  C.ad_tracking_id = A.ad_tracking_id"
                        +"   ) AS t                                                               "
        );


        tEnv.executeSql("select * from tracking_view").print();

        // 用上面的串联好的曝光、点击事件流，去lookup join 存放于hbase中的  请求特征数据流 uid,ad_tracking_id,{tag1:xx,tag2:yy}
        // TODO




    }
}
